AWS Clean Roomsで取得したデータを自アカウントのAthenaで利用する

AWS Clean Roomsで取得したデータを自アカウントのAthenaで利用する

AWS Clean Roomsで取得したデータを、データマートとして自アカウントのAthenaで利用する際の動線について考えてみました。
Clock Icon2023.06.19

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

データアナリティクス事業本部の鈴木です。

今年3月に一般提供開始となったAWS Clean Roomsですが、Analysis BuilderやCloudFormationへの対応など、続々とアップデートが出てきていますね。

どんどんとブラッシュアップされているAWS Clean Roomsを活用するにあたって、改めてAWS Clean Roomsでクロスアカウント連携したデータをどのように使うとよさそうか、クエリ結果をデータマートとしてAthenaで利用する例で少し考えてみたのでブログにしました。

簡単なものにはなりますが、AWS Clean Roomsの活用のヒントになりましたら幸いです。

検討した構成について

AWS Clean Roomsでプロデューサー側のアカウントのデータを取得し、コンシューマーである自アカウントで利用する構成の例として、以下のようなものを考えました。

構成

プロデューサー側のアカウントで参照するテーブルは、データマートとしています。必ずしもデータマートのように集計済みのものでなくても構いませんが、あまりに生データに近いものだと処理データ量が多くなってしまい、消費するCRPUが非常に大きくなってしまう場合があるかと思います。

以下では、この構成が実際にできそうなものなのか確認してみます。

コンソールからクエリを実行する場合

まず、AWS Clean Roomsのコンソールから手でクエリを実行する場合のことを考えてみます。

AWS Clean Roomsのセットアップは、以下の記事に従って行ったものとします。

クエリの実行例

クエリ結果設定は以下のようにされているとします。

クエリ結果設定

クエリエディタからクエリを実行してみました。

クエリエディタからの実行

設定通り、以下のようにS3にクエリ結果が出力されました。

クエリエディタからのクエリ結果

出力結果から分かるように、クエリ結果設定したパスの下にクエリIDのディレクトリが掘られて結果が出力されることが分かりました。

クエリのAthenaからの検索例

以下のようなGlueテーブルの定義で、Athenaから検索することは可能ですが、コンソールからのクエリ実行のたびに、異なるクエリIDでデータが保存されていくので冪等性の担保に工夫が必要そうです。

-- <AWS Clean Roomsからクエリ結果を保存するS3バケット名>は適当なものに置き換えてください。
CREATE EXTERNAL TABLE `database1.iris_avg` (
  `sepal_length_avg` double,
  `sepal_width_avg` double,
  `petal_length_avg` double,
  `petal_width_avg` double,
  `class` string
  )
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  's3://<AWS Clean Roomsからクエリ結果を保存するS3バケット名>/iris/'
TBLPROPERTIES ('has_encrypted_data'='false')

Partition射影のinjected型を使ってクエリIDを指定して検索するようなテーブル定義も可能ですが、AthenaのユーザーがクエリIDを把握することは現実的ではないので、クエリ結果設定したパスを参照するGlueテーブルを作成し、その下のデータは毎回洗い替えするような運用になりそうです。

上記のテーブル定義だと、以下のようにAthenaから検索ができました。

Athenaからの検索結果

APIからクエリを実行する場合

手動でコンソールから実行する場合の例を考えましたが、例えば日次実行で差分データを取得するような仕組みにしたい場合は、可能であれば実行を自動化したいところです。

クエリはAPIから実行することが可能なので、確認してみました。

実行方法の例

AWS Clean Roomsのクエリの開始は、StartProtectedQueryアクションで可能です。

私のよく使う範囲だと、以下のツールから実行できることを確認しました。

例として、Boto3からの実行を試してみます。

Pythonからクエリを実行してみる

簡単なコードスニペットですが、以下のようにBoto3を使った例を試してみました。

今回は日次差分でクエリを行う想定で、yyyymmddの階層の下にクエリ実行日のデータを格納するようなことが可能か試してみました。

コラボレーションID保存先のバケット名は自分の環境にあったものに変えてください。

import boto3


def main():
    query_string = """
    select 
      AVG(sepal_length) as sepal_length_avg,
      AVG(sepal_width) as sepal_width_avg,
      AVG(petal_length) as petal_length_avg,
      AVG(petal_width) as petal_width_avg,
      class
    from iris_quality_check
    group by class
    """

    yyyymmdd = "20230619"

    client = boto3.client('cleanrooms')
    response = client.start_protected_query(
        type='SQL',
        membershipIdentifier='コラボレーションID',
        sqlParameters={
            'queryString': query_string
        },
        resultConfiguration={
            'outputConfiguration': {
                's3': {
                    'resultFormat': 'PARQUET',
                    'bucket': '保存先のバケット名',
                    'keyPrefix': f'iris/{yyyymmdd}'
                }
            }
        }
    )

    print(response)


if __name__ == "__main__":
    main()

プロファイルを使いたかったので、以下のように実行しました。利用するプロファイル名は自分の環境にあったものに変えてください。

export AWS_SDK_LOAD_CONFIG=1   
export AWS_PROFILE=利用するプロファイル名

python3 start_protectied_query.py

コンソールから履歴を確認すると、以下のようにクエリが実行されていました。

クエリの実行結果

完了するとS3バケットに、日付の階層を掘ってデータを格納できたことが確認できました。

日付階層ありのクエリ結果

クエリのAthenaからの検索例

記事執筆時点では、『ProtectedQueryS3OutputConfiguration - AWS Clean Rooms』に記載のkeyPrefix[\w!.*/-]*のパターンを満たす必要があるため、出力先のキープレフィクスに=を含めることはできず、パーティションを利用するにはAthenaのパーティション射影の利用が前提になりそうでした。

パーティション射影を使ったテーブルを使い、Athenaで結果がクエリできることを確認しました。

パーティション射影を有効としたテーブルからの検索結果

テーブル定義は以下です。

-- <AWS Clean Roomsからクエリ結果を保存するS3バケット名>は適当なものに置き換えてください。
CREATE EXTERNAL TABLE `database1.iris_avg_partitioned` (
  `sepal_length_avg` double,
  `sepal_width_avg` double,
  `petal_length_avg` double,
  `petal_width_avg` double,
  `class` string
  )
PARTITIONED BY ( 
  yyyymmdd string)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  's3://<AWS Clean Roomsからクエリ結果を保存するS3バケット名>/iris/'
TBLPROPERTIES (
  'has_encrypted_data'='false', 
  'projection.enabled'='true', 
  'projection.type.type'='injected',
  'storage.location.template'='s3://<AWS Clean Roomsからクエリ結果を保存するS3バケット名>/iris/${yyyymmdd}',
)

という訳で、クエリの実行のため、APIにリクエストする際に、outputConfigurationkeyPrefixに実行当日の日付を持たせることができれば、定期的に差分取得するようなユースケースにも対応できそうですね。

最後に

今回はAWS Clean Roomsのクエリ結果をコンシューマーのアカウントでデータマートとして使用する際、どのような構成にしておくとよさそうか検討してみました。

各種SDKなどからStartProtectedQueryアクションを実行する際に、クエリ結果保存先に日付などを設定できるため、差分取得したいような場合でも利用ができそうです。

参考になりましたら幸いです。

補足

コラボレーションIDは、たとえば以下のページから確認できました。

コラボレーションID

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.